package com.yq.canalkafka.config;

import com.alibaba.otter.canal.client.kafka.MessageDeserializer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;

import java.util.HashMap;
import java.util.Map;

/**
 * @author by shizan
 * @Classname KafkaConfig
 * @Description kafka配置类
 * @Date 2020/11/23 3:49 下午
 */
@Configuration
@EnableKafka
public class KafkaConfig {
    @Autowired
    private PropsConfig propsConfig;


    @Bean
    KafkaListenerContainerFactory<?> batchFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
        // 开启批量监听
        factory.setBatchListener(true);
        factory.getContainerProperties().setPollTimeout(propsConfig.getPollTimeout());
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
        return factory;
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, propsConfig.getServers());
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, propsConfig.getGroupId());
        propsMap.put(ConsumerConfig.CLIENT_ID_CONFIG, propsConfig.getClientId());
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, propsConfig.getEnableAutoCommit());
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        // 如果没有offset则从最后的offset开始读
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        // 必须大于session.timeout.ms的设置
        propsMap.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "40000");
        // 默认为30秒
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
        propsMap.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
        //设置每次接收Message的数量
        propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MessageDeserializer.class.getName());
        return propsMap;
    }

}
